package cn.movie;

import java.io.IOException;
import java.util.UUID;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class Demo02_ImportToHBaseMR extends Configured implements Tool{
	@Override
	public int run(String[] args) throws Exception {
		// 输入
		if (args.length != 1) {
			System.out.println("usage :in");
			return -1;
		}
		Configuration config = HBaseConfiguration.create();
		config.set("fs.defaultFS", "hdfs://hadoop31:8020");
		config.set("hbase.zookeeper.quorum", "hadoop31:2181");
		//
		Job job = Job.getInstance(config);
		job.setJarByClass(getClass());
		//
		job.setMapperClass(MyMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		//
		FileInputFormat.addInputPath(job, new Path(args[0]));// 设置输入的源
		TableMapReduceUtil.initTableReducerJob("movie", MyReducer.class, job);
		return job.waitForCompletion(true) ? 0 : 1;
	}

	public static void main(String[] args) throws Exception {
		int code = ToolRunner.run(new Demo02_ImportToHBaseMR(), args);
		System.exit(code);
	}

	public static class MyMapper extends Mapper<LongWritable, Text, Text, Text> {
		Text tt = new Text();
		
		@Override
		public void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
				throws IOException, InterruptedException {
			String uu = UUID.randomUUID().toString();
			uu = uu.replace("-", "");
			String[] strs = value.toString().split("\\s+");
			tt.set(strs[1] + "," + strs[2] + "," + strs[0]+","+strs[3]+","+strs[4]+","+strs[5]);
			context.write(new Text(uu), tt);
		}
	}

	// ：S001,["S001,Jack,45","S001,Mary,46"],"S001"
	public static class MyReducer extends TableReducer<Text, Text, Text> {
		@Override
		public void reduce(Text key, Iterable<Text> value, Reducer<Text, Text, Text, Mutation>.Context ctx)
				throws IOException, InterruptedException {
			for (Text t : value) {
				String[] strs = t.toString().split(",");
				Put put = new Put(Bytes.toBytes(strs[0]));
				put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(strs[1]));
				ctx.write(key, put);
				// 再保存age
				Put put2 = new Put(Bytes.toBytes(strs[0]));
				put2.addColumn(Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes(Long.parseLong(strs[2])));
				ctx.write(key, put2);
			}
		}
	}
}
